KAFKA-12648: fix bug where thread is re-added to TopologyMetadata when shutting down#11857
Conversation
|
cc @wcarlson5 |
| log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); | ||
| if (topologyMetadata.isEmpty()) { | ||
| mainConsumer.unsubscribe(); | ||
| } | ||
| topologyMetadata.maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(getName()); |
There was a problem hiding this comment.
All of this has been moved into taskManager#handleTopologyUpdates
| */ | ||
| void handleTopologyUpdates() { | ||
| tasks.maybeCreateTasksFromNewTopologies(); | ||
| final Set<String> currentNamedTopologies = topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName()); |
There was a problem hiding this comment.
This isn't the main fix, but we were playing a little fast and loose with the topology version we were reporting having ack'ed -- tightened this up by first atomically updating the topology version and saving the set of current named topologies, then doing the actual update handling, and then checking the listeners and completing any finished add/remove topology requests
| public ReentrantLock topologyLock = new ReentrantLock(); | ||
| public Condition topologyCV = topologyLock.newCondition(); | ||
| public List<TopologyVersionWaiters> activeTopologyWaiters = new LinkedList<>(); | ||
| public List<TopologyVersionListener> activeTopologyUpdateListeners = new LinkedList<>(); |
There was a problem hiding this comment.
Just renamed from waiters to listeners
There was a problem hiding this comment.
Also another quick question regarding why we need to keep topologyVersion an atomic long? Seems besides the getters all of its updators are under the lock as well.
There was a problem hiding this comment.
Good find, yeah I believe it no longer needs to be an AtomicLong, I'll change back to long
There was a problem hiding this comment.
Oh right actually no, we. do still need it to be an AtomicLong as we check it in the StreamThread main loop when looking for topology updates. And obviously we don't want to have to grab the full lock for that
| final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); | ||
| TopologyVersionWaiters topologyVersionWaiters; | ||
| version.topologyLock.lock(); | ||
| threadVersions.put(threadName, topologyVersion()); |
There was a problem hiding this comment.
@wcarlson5 / @guozhangwang / @vvcephei This is the main fix -- we need to split out the version update where we add the current thread with the latest topology version to this threadVersions map, since this of course should only be done when we're reacting to a topology update.
The other function of the method was to check whether we could complete any of the queued listeners, which is why we were invoking this when shutting down a thread. Splitting this out into a separate method avoids ghost threads being left behind in the threadVersions map
| } | ||
| topologyVersionListener = iterator.next(); | ||
| final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; | ||
| if (minThreadVersion >= topologyVersionWaitersVersion) { |
There was a problem hiding this comment.
I also refactored this slightly to optimize/clean up this method. It's less about the optimization as we should generally not have too many threads per KafkaStreams runtime, but I found it much easier to follow the logic by computing the minimum version across all threads and then completing all futures listening for the topology to be updated up to that version
| } | ||
| topologyVersionListener = iterator.next(); | ||
| final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; | ||
| if (minThreadVersion >= topologyVersionWaitersVersion) { |
There was a problem hiding this comment.
I think we also want to remove the listeners for threads that were removed as well right?
There was a problem hiding this comment.
The listeners are for the caller threads not stream threads right? I thought since the thread is removed, it would not be counted in the getMinimumThreadVersion() and hence would not block the listeners to be removed.
There was a problem hiding this comment.
No, the listeners are for stream threads. They get added in the task manager. Once all threads are at the version the future blocking the calling thread is completed.
There was a problem hiding this comment.
Hmm... just to make sure we are talking about version.activeTopologyUpdateListeners right? These listeners are for the calling thread of the removeNamedTopology / addNamedTopology / start, which would get the wraped futures these listeners are constructed on.
Anyways, my understanding is that when a thread is removed, the getMinimumThreadVersion returned version would not take that removed thread into consideration, so that even the removed thread's version is low it would not block the future being completed.
There was a problem hiding this comment.
Ah @guozhangwang yeah the getMinimumThreadVersion should take care of it.
There was a problem hiding this comment.
Yeah I think this was already resolved but just to clarify for anyone else reading this/ourselves in the future, yes, the listeners are for the callers of add/removeNamedTopology 👍
guozhangwang
left a comment
There was a problem hiding this comment.
@ableegoldman the change lgtm overall. But I have a meta question about our synchronization: in the TopologyMetadata class we have two synchronization manners: 1) we use a lock for any changes to the TopologyVersion object, 2) we made the threadVersions a concurrent hashmap, but not all modifications (e.g. register/deregister thread) would require the lock in 1).
That means the TopologyVersion object may not be always consistent from the threadVersions map. Is this okay or intended by our design? If not, maybe we can just make the threadVersions be part of the TopologyVersion and be always updated with the same lock.
| } | ||
| topologyVersionListener = iterator.next(); | ||
| final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; | ||
| if (minThreadVersion >= topologyVersionWaitersVersion) { |
There was a problem hiding this comment.
The listeners are for the caller threads not stream threads right? I thought since the thread is removed, it would not be counted in the getMinimumThreadVersion() and hence would not block the listeners to be removed.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
| } | ||
| topologyVersionListener = iterator.next(); | ||
| final long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion; | ||
| if (minThreadVersion >= topologyVersionWaitersVersion) { |
There was a problem hiding this comment.
Hmm... just to make sure we are talking about version.activeTopologyUpdateListeners right? These listeners are for the calling thread of the removeNamedTopology / addNamedTopology / start, which would get the wraped futures these listeners are constructed on.
Anyways, my understanding is that when a thread is removed, the getMinimumThreadVersion returned version would not take that removed thread into consideration, so that even the removed thread's version is low it would not block the future being completed.
|
All test failures are unrelated, going to merge this now |
…n shutting down (apache#11857) (#674) We used to call TopologyMetadata#maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion when a thread was being unregistered/shutting down, to check if any of the futures listening for topology updates had been waiting on this thread and could be completed. Prior to invoking this we make sure to remove the current thread from the TopologyMetadata's threadVersions map, but this thread is actually then re-added in the #maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion call. To fix this, we should break up this method into separate calls for each of its two distinct functions, updating the version and checking for topology update completion. When unregistering a thread, we should only invoke the latter method Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io> Co-authored-by: A. Sophie Blee-Goldman <sophie@confluent.io>
We used to call
TopologyMetadata#maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersionwhen a thread was being unregistered/shutting down, to check if any of the futures listening for topology updates had been waiting on this thread and could be completed. Prior to invoking this we make sure to remove the current thread from the TopologyMetadata'sthreadVersionsmap, but this thread is actually then re-added in the#maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersioncall.To fix this, we should break up this method into separate calls for each of its two distinct functions, updating the version and checking for topology update completion. When unregistering a thread, we should only invoke the latter method